// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package client

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"math"
	"net/http"
	"os"
	"path/filepath"
	"sort"
	"strconv"
	"strings"
	"syscall"
	"time"

	metrics "github.com/hashicorp/go-metrics/compat"
	"github.com/hashicorp/go-msgpack/v2/codec"
	"github.com/hpcloud/tail/watch"

	"github.com/hashicorp/nomad/acl"
	"github.com/hashicorp/nomad/client/allocdir"
	sframer "github.com/hashicorp/nomad/client/lib/streamframer"
	cstructs "github.com/hashicorp/nomad/client/structs"
	"github.com/hashicorp/nomad/helper/pointer"
	"github.com/hashicorp/nomad/nomad/structs"
)

var (
	allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
	pathNotPresentErr    = fmt.Errorf("must provide a file path")
	taskNotPresentErr    = fmt.Errorf("must provide task name")
	logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
	invalidOrigin        = fmt.Errorf("origin must be start or end")
)

const (
	// streamFramesBuffer is the number of stream frames that will be buffered
	// before back pressure is applied on the stream framer.
	streamFramesBuffer = 32

	// streamFrameSize is the maximum number of bytes to send in a single frame
	streamFrameSize = 64 * 1024

	// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
	// a closed connection without sending any additional data
	streamHeartbeatRate = 1 * time.Second

	// streamBatchWindow is the window in which file content is batched before
	// being flushed if the frame size has not been hit.
	streamBatchWindow = 200 * time.Millisecond

	// nextLogCheckRate is the rate at which we check for a log entry greater
	// than what we are watching for. This is to handle the case in which logs
	// rotate faster than we can detect and we have to rely on a normal
	// directory listing.
	nextLogCheckRate = 100 * time.Millisecond

	// deleteEvent and truncateEvent are the file events that can be sent in a
	// StreamFrame
	deleteEvent   = "file deleted"
	truncateEvent = "file truncated"

	// OriginStart and OriginEnd are the available parameters for the origin
	// argument when streaming a file. They respectively offset from the start
	// and end of a file.
	OriginStart = "start"
	OriginEnd   = "end"
)

// FileSystem endpoint is used for accessing the logs and filesystem of
// allocations.
type FileSystem struct {
	c *Client
}

func NewFileSystemEndpoint(c *Client) *FileSystem {
	f := &FileSystem{c}
	f.c.streamingRpcs.Register("FileSystem.Logs", f.logs)
	f.c.streamingRpcs.Register("FileSystem.Stream", f.stream)
	return f
}

// handleStreamResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
func handleStreamResultError(err error, code *int64, encoder *codec.Encoder) {
	// Nothing to do as the conn is closed
	if err == io.EOF || strings.Contains(err.Error(), "closed") {
		return
	}

	encoder.Encode(&cstructs.StreamErrWrapper{
		Error: cstructs.NewRpcError(err, code),
	})
}

// List is used to list the contents of an allocation's directory.
func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error {
	defer metrics.MeasureSince([]string{"client", "file_system", "list"}, time.Now())

	alloc, err := f.c.GetAlloc(args.AllocID)
	if err != nil {
		return err
	}

	// Check namespace read-fs permission.
	if aclObj, err := f.c.ResolveToken(args.QueryOptions.AuthToken); err != nil {
		return err
	} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
		return structs.ErrPermissionDenied
	}

	fs, err := f.c.GetAllocFS(args.AllocID)
	if err != nil {
		return err
	}
	files, err := fs.List(args.Path)
	if err != nil {
		return err
	}

	reply.Files = files
	return nil
}

// Stat is used to stat a file in the allocation's directory.
func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatResponse) error {
	defer metrics.MeasureSince([]string{"client", "file_system", "stat"}, time.Now())

	alloc, err := f.c.GetAlloc(args.AllocID)
	if err != nil {
		return err
	}

	// Check namespace read-fs permission.
	if aclObj, err := f.c.ResolveToken(args.QueryOptions.AuthToken); err != nil {
		return err
	} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
		return structs.ErrPermissionDenied
	}

	fs, err := f.c.GetAllocFS(args.AllocID)
	if err != nil {
		return err
	}
	info, err := fs.Stat(args.Path)
	if err != nil {
		return err
	}

	reply.Info = info
	return nil
}

// stream is is used to stream the contents of file in an allocation's
// directory.
func (f *FileSystem) stream(conn io.ReadWriteCloser) {
	defer metrics.MeasureSince([]string{"client", "file_system", "stream"}, time.Now())
	defer conn.Close()

	// Decode the arguments
	var req cstructs.FsStreamRequest
	decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
	encoder := codec.NewEncoder(conn, structs.MsgpackHandle)

	if err := decoder.Decode(&req); err != nil {
		handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
		return
	}

	if req.AllocID == "" {
		handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}

	ar, err := f.c.getAllocRunner(req.AllocID)
	if err != nil {
		handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
		return
	}
	if ar.IsDestroyed() {
		handleStreamResultError(
			fmt.Errorf("state for allocation %s not found on client", req.AllocID),
			pointer.Of(int64(http.StatusNotFound)),
			encoder,
		)
		return
	}
	alloc := ar.Alloc()

	// Check read permissions
	if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
		handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder)
		return
	} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) {
		handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder)
		return
	}

	// Validate the arguments
	if req.Path == "" {
		handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}
	switch req.Origin {
	case "start", "end":
	case "":
		req.Origin = "start"
	default:
		handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}

	fs, err := f.c.GetAllocFS(req.AllocID)
	if err != nil {
		code := pointer.Of(int64(http.StatusInternalServerError))
		if structs.IsErrUnknownAllocation(err) {
			code = pointer.Of(int64(http.StatusNotFound))
		}

		handleStreamResultError(err, code, encoder)
		return
	}

	// Calculate the offset
	fileInfo, err := fs.Stat(req.Path)
	if err != nil {
		handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}
	if fileInfo.IsDir {
		handleStreamResultError(
			fmt.Errorf("file %q is a directory", req.Path),
			pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}

	// If offsetting from the end subtract from the size
	if req.Origin == "end" {
		req.Offset = fileInfo.Size - req.Offset
		if req.Offset < 0 {
			req.Offset = 0
		}
	}

	frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
	errCh := make(chan error)
	var buf bytes.Buffer
	frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

	// Create the framer
	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
	framer.Run()
	defer framer.Destroy()

	// If we aren't following end as soon as we hit EOF
	cancelAfterFirstEof := !req.Follow

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Start streaming
	go func() {
		if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil {
			select {
			case errCh <- err:
			case <-ctx.Done():
			}
		}

		framer.Destroy()
	}()

	// Create a goroutine to detect the remote side closing
	go func() {
		for {
			if _, err := conn.Read(nil); err != nil {
				if err == io.EOF || err == io.ErrClosedPipe {
					// One end of the pipe was explicitly closed, exit cleanly
					cancel()
					return
				}
				select {
				case errCh <- err:
				case <-ctx.Done():
					return
				}
			}
		}
	}()

	var streamErr error
OUTER:
	for {
		select {
		case streamErr = <-errCh:
			break OUTER
		case frame, ok := <-frames:
			if !ok {
				// frame may have been closed when an error
				// occurred. Check once more for an error.
				select {
				case streamErr = <-errCh:
					// There was a pending error!
				default:
					// No error, continue on
				}

				break OUTER
			}

			var resp cstructs.StreamErrWrapper
			if req.PlainText {
				resp.Payload = frame.Data
			} else {
				if err = frameCodec.Encode(frame); err != nil {
					streamErr = err
					break OUTER
				}

				resp.Payload = buf.Bytes()
				buf.Reset()
			}

			if err := encoder.Encode(resp); err != nil {
				streamErr = err
				break OUTER
			}
			encoder.Reset(conn)
		case <-ctx.Done():
			break OUTER
		}
	}

	if streamErr != nil {
		handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder)
		return
	}
}

// logs is is used to stream a task's logs.
func (f *FileSystem) logs(conn io.ReadWriteCloser) {
	defer metrics.MeasureSince([]string{"client", "file_system", "logs"}, time.Now())
	defer conn.Close()

	// Decode the arguments
	var req cstructs.FsLogsRequest
	decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
	encoder := codec.NewEncoder(conn, structs.MsgpackHandle)

	if err := decoder.Decode(&req); err != nil {
		handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder)
		return
	}

	if req.AllocID == "" {
		handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}

	ar, err := f.c.getAllocRunner(req.AllocID)
	if err != nil {
		handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder)
		return
	}
	if ar.IsDestroyed() {
		handleStreamResultError(
			fmt.Errorf("state for allocation %s not found on client", req.AllocID),
			pointer.Of(int64(http.StatusNotFound)),
			encoder,
		)
		return
	}
	alloc := ar.Alloc()

	// Check read permissions
	aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken)
	if err != nil {
		handleStreamResultError(err, nil, encoder)
		return
	}

	readfs := aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS)
	logs := aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadLogs)
	if !readfs && !logs {
		handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
		return
	}

	// Validate the arguments
	if req.Task == "" {
		handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}
	switch req.LogType {
	case "stdout", "stderr":
	default:
		handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}
	switch req.Origin {
	case "start", "end":
	case "":
		req.Origin = "start"
	default:
		handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder)
		return
	}

	fs, err := f.c.GetAllocFS(req.AllocID)
	if err != nil {
		code := pointer.Of(int64(http.StatusInternalServerError))
		if structs.IsErrUnknownAllocation(err) {
			code = pointer.Of(int64(http.StatusNotFound))
		}

		handleStreamResultError(err, code, encoder)
		return
	}

	allocState, err := f.c.GetAllocState(req.AllocID)
	if err != nil {
		code := pointer.Of(int64(http.StatusInternalServerError))
		if structs.IsErrUnknownAllocation(err) {
			code = pointer.Of(int64(http.StatusNotFound))
		}

		handleStreamResultError(err, code, encoder)
		return
	}

	// Check that the task is there
	taskState := allocState.TaskStates[req.Task]
	if taskState == nil {
		handleStreamResultError(
			fmt.Errorf("unknown task name %q", req.Task),
			pointer.Of(int64(http.StatusBadRequest)),
			encoder)
		return
	}

	if taskState.StartedAt.IsZero() {
		handleStreamResultError(
			fmt.Errorf("task %q not started yet. No logs available", req.Task),
			pointer.Of(int64(http.StatusNotFound)),
			encoder)
		return
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
	errCh := make(chan error)

	// Start streaming
	go func() {
		if err := f.logsImpl(ctx, req.Follow, req.PlainText,
			req.Offset, req.Origin, req.Task, req.LogType, fs, frames); err != nil {
			select {
			case errCh <- err:
			case <-ctx.Done():
			}
		}
	}()

	// Create a goroutine to detect the remote side closing
	go func() {
		for {
			if _, err := conn.Read(nil); err != nil {
				if err == io.EOF || err == io.ErrClosedPipe {
					// One end of the pipe was explicitly closed, exit cleanly
					cancel()
					return
				}
				select {
				case errCh <- err:
				case <-ctx.Done():
				}
				return
			}
		}
	}()

	var streamErr error
	buf := new(bytes.Buffer)
	frameCodec := codec.NewEncoder(buf, structs.JsonHandle)
OUTER:
	for {
		select {
		case streamErr = <-errCh:
			break OUTER
		case frame, ok := <-frames:
			if !ok {
				// framer may have been closed when an error
				// occurred. Check once more for an error.
				select {
				case streamErr = <-errCh:
					// There was a pending error!
				default:
					// No error, continue on
				}

				break OUTER
			}

			var resp cstructs.StreamErrWrapper
			if req.PlainText {
				resp.Payload = frame.Data
			} else {
				if err = frameCodec.Encode(frame); err != nil {
					streamErr = err
					break OUTER
				}
				frameCodec.Reset(buf)

				resp.Payload = buf.Bytes()
				buf.Reset()
			}

			if err := encoder.Encode(resp); err != nil {
				streamErr = err
				break OUTER
			}
			encoder.Reset(conn)
		}
	}

	if streamErr != nil {
		// If error has a Code, use it
		var code int64 = http.StatusInternalServerError
		if codedErr, ok := streamErr.(interface{ Code() int }); ok {
			code = int64(codedErr.Code())
		}
		handleStreamResultError(streamErr, &code, encoder)
		return
	}
}

// logsImpl is used to stream the logs of a the given task. Output is sent on
// the passed frames channel and the method will return on EOF if follow is not
// true otherwise when the context is cancelled or on an error.
func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset int64,
	origin, task, logType string,
	fs allocdir.AllocDirFS, frames chan<- *sframer.StreamFrame) error {

	// Create the framer
	framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
	framer.Run()
	defer framer.Destroy()

	// Path to the logs
	logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)

	// nextIdx is the next index to read logs from
	var nextIdx int64
	switch origin {
	case "start":
		nextIdx = 0
	case "end":
		nextIdx = math.MaxInt64
		offset *= -1
	default:
		return invalidOrigin
	}

	for {
		// Logic for picking next file is:
		// 1) List log files
		// 2) Pick log file closest to desired index
		// 3) Open log file at correct offset
		// 3a) No error, read contents
		// 3b) If file doesn't exist, goto 1 as it may have been rotated out
		entries, err := fs.List(logPath)
		if err != nil {
			return fmt.Errorf("failed to list entries: %v", err)
		}

		// If we are not following logs, determine the max index for the logs we are
		// interested in so we can stop there.
		maxIndex := int64(math.MaxInt64)
		if !follow {
			_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
			if err != nil {
				return err
			}
			maxIndex = idx
		}

		logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
		if err != nil {
			return err
		}

		var eofCancelCh chan error
		cancelAfterFirstEof := false
		exitAfter := false
		if !follow && idx > maxIndex {
			// Exceeded what was there initially so return
			return nil
		} else if !follow && idx == maxIndex {
			// At the end
			cancelAfterFirstEof = true
			exitAfter = true
		} else {
			eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
		}

		p := filepath.Join(logPath, logEntry.Name)
		err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof)

		// Check if the context is cancelled
		select {
		case <-ctx.Done():
			return nil
		default:
		}

		if err != nil {
			// Check if there was an error where the file does not exist. That means
			// it got rotated out from under us.
			if os.IsNotExist(err) {
				continue
			}

			// Check if the connection was closed
			if err == syscall.EPIPE {
				return nil
			}

			return fmt.Errorf("failed to stream %q: %v", p, err)
		}

		if exitAfter {
			return nil
		}

		// defensively check to make sure StreamFramer hasn't stopped
		// running to avoid tight loops with goroutine leaks as in
		// #3342
		select {
		case <-framer.ExitCh():
			return nil
		default:
		}

		// Since we successfully streamed, update the overall offset/idx.
		offset = int64(0)
		nextIdx = idx + 1
	}
}

// streamFile is the internal method to stream the content of a file. If limit
// is greater than zero, the stream will end once that many bytes have been
// read. If eofCancelCh is triggered while at EOF, read one more frame and
// cancel the stream on the next EOF. If the connection is broken an EPIPE
// error is returned.
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
	fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error {

	// Get the reader
	file, err := fs.ReadAt(path, offset)
	if err != nil {
		return err
	}
	defer file.Close()

	var fileReader io.Reader
	if limit <= 0 {
		fileReader = file
	} else {
		fileReader = io.LimitReader(file, limit)
	}

	// Create a tomb to cancel watch events
	waitCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	// Create a variable to allow setting the last event
	var lastEvent string

	// Only create the file change watcher once. But we need to do it after we
	// read and reach EOF.
	var changes *watch.FileChanges

	// Only watch file when there is a need for it
	cancelReceived := cancelAfterFirstEof

	// Start streaming the data
	bufSize := int64(streamFrameSize)
	if limit > 0 && limit < streamFrameSize {
		bufSize = limit
	}
	data := make([]byte, bufSize)
OUTER:
	for {
		// Read up to the max frame size
		n, readErr := fileReader.Read(data)

		// Update the offset
		offset += int64(n)

		// Return non-EOF errors
		if readErr != nil && readErr != io.EOF {
			return readErr
		}

		// Send the frame
		if n != 0 || lastEvent != "" {
			if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
				return parseFramerErr(err)
			}
		}

		// Clear the last event
		if lastEvent != "" {
			lastEvent = ""
		}

		// Just keep reading since we aren't at the end of the file so we can
		// avoid setting up a file event watcher.
		if readErr == nil {
			continue
		}

		// At this point we can stop without waiting for more changes,
		// because we have EOF and either we're not following at all,
		// or we received an event from the eofCancelCh channel
		// and last read was executed
		if cancelReceived {
			return nil
		}

		// If EOF is hit, wait for a change to the file
		if changes == nil {
			changes, err = fs.ChangeEvents(waitCtx, path, offset)
			if err != nil {
				return err
			}
		}

		for {
			select {
			case <-changes.Modified:
				continue OUTER
			case <-changes.Deleted:
				return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
			case <-changes.Truncated:
				// Close the current reader
				if err := file.Close(); err != nil {
					return err
				}

				// Get a new reader at offset zero
				offset = 0
				var err error
				file, err = fs.ReadAt(path, offset)
				if err != nil {
					return err
				}
				defer file.Close()

				if limit <= 0 {
					fileReader = file
				} else {
					// Get the current limit
					lr, ok := fileReader.(*io.LimitedReader)
					if !ok {
						return fmt.Errorf("unable to determine remaining read limit")
					}

					fileReader = io.LimitReader(file, lr.N)
				}

				// Store the last event
				lastEvent = truncateEvent
				continue OUTER
			case <-framer.ExitCh():
				return nil
			case <-ctx.Done():
				return nil
			case _, ok := <-eofCancelCh:
				if !ok {
					return nil
				}

				if err != nil {
					return err
				}

				// try to read one more frame to avoid dropped entries
				// during log rotation
				cancelReceived = true
				continue OUTER
			}
		}
	}
}

// blockUntilNextLog returns a channel that will have data sent when the next
// log index or anything greater is created.
func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, task, logType string, nextIndex int64) chan error {
	nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
	next := make(chan error, 1)

	go func() {
		eofCancelCh, err := fs.BlockUntilExists(ctx, nextPath)
		if err != nil {
			next <- err
			close(next)
			return
		}

		ticker := time.NewTicker(nextLogCheckRate)
		defer ticker.Stop()
		scanCh := ticker.C
		for {
			select {
			case <-ctx.Done():
				next <- nil
				close(next)
				return
			case err := <-eofCancelCh:
				next <- err
				close(next)
				return
			case <-scanCh:
				entries, err := fs.List(logPath)
				if err != nil {
					next <- fmt.Errorf("failed to list entries: %v", err)
					close(next)
					return
				}

				indexes, err := logIndexes(entries, task, logType)
				if err != nil {
					next <- err
					close(next)
					return
				}

				// Scan and see if there are any entries larger than what we are
				// waiting for.
				for _, entry := range indexes {
					if entry.idx >= nextIndex {
						next <- nil
						close(next)
						return
					}
				}
			}
		}
	}()

	return next
}

// indexTuple and indexTupleArray are used to find the correct log entry to
// start streaming logs from
type indexTuple struct {
	idx   int64
	entry *cstructs.AllocFileInfo
}

type indexTupleArray []indexTuple

func (a indexTupleArray) Len() int           { return len(a) }
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
func (a indexTupleArray) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }

// logIndexes takes a set of entries and returns a indexTupleArray of
// the desired log file entries. If the indexes could not be determined, an
// error is returned.
func logIndexes(entries []*cstructs.AllocFileInfo, task, logType string) (indexTupleArray, error) {
	var indexes []indexTuple
	prefix := fmt.Sprintf("%s.%s.", task, logType)
	for _, entry := range entries {
		if entry.IsDir {
			continue
		}

		// If nothing was trimmed, then it is not a match
		idxStr := strings.TrimPrefix(entry.Name, prefix)
		if idxStr == entry.Name {
			continue
		}

		// Convert to an int
		idx, err := strconv.Atoi(idxStr)
		if err != nil {
			return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
		}

		indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
	}

	return indexTupleArray(indexes), nil
}

// notFoundErr is returned when a log is requested but cannot be found.
// Implements agent.HTTPCodedError but does not reference it to avoid circular
// imports.
type notFoundErr struct {
	taskName string
	logType  string
}

func (e notFoundErr) Error() string {
	return fmt.Sprintf("log entry for task %q and log type %q not found", e.taskName, e.logType)
}

// Code returns a 404 to avoid returning a 500
func (e notFoundErr) Code() int {
	return http.StatusNotFound
}

// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*cstructs.AllocFileInfo, desiredIdx, desiredOffset int64,
	task, logType string) (*cstructs.AllocFileInfo, int64, int64, error) {

	// Build the matching indexes
	indexes, err := logIndexes(entries, task, logType)
	if err != nil {
		return nil, 0, 0, err
	}
	if len(indexes) == 0 {
		return nil, 0, 0, notFoundErr{taskName: task, logType: logType}
	}

	// Binary search the indexes to get the desiredIdx
	sort.Sort(indexes)
	i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
	l := len(indexes)
	if i == l {
		// Use the last index if the number is bigger than all of them.
		i = l - 1
	}

	// Get to the correct offset
	offset := desiredOffset
	idx := int64(i)
	for {
		s := indexes[idx].entry.Size

		// Base case
		if offset == 0 {
			break
		} else if offset < 0 {
			// Going backwards
			if newOffset := s + offset; newOffset >= 0 {
				// Current file works
				offset = newOffset
				break
			} else if idx == 0 {
				// Already at the end
				offset = 0
				break
			} else {
				// Try the file before
				offset = newOffset
				idx -= 1
				continue
			}
		} else {
			// Going forward
			if offset <= s {
				// Current file works
				break
			} else if idx == int64(l-1) {
				// Already at the end
				offset = s
				break
			} else {
				// Try the next file
				offset = offset - s
				idx += 1
				continue
			}

		}
	}

	return indexes[idx].entry, indexes[idx].idx, offset, nil
}

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
func parseFramerErr(err error) error {
	if err == nil {
		return nil
	}

	errMsg := err.Error()

	if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
		// The pipe check is for tests
		return syscall.EPIPE
	}

	// The connection was closed by our peer
	if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
		return syscall.EPIPE
	}

	// Windows version of ECONNRESET
	//XXX(schmichael) I could find no existing error or constant to
	//                compare this against.
	if strings.Contains(errMsg, "forcibly closed") {
		return syscall.EPIPE
	}

	return err
}
